c3f15d8c3328dc8b54e1cb6bf5b32b3ef46ba74c,thresh/src/main/java/monasca/thresh/TopologyModule.java,TopologyModule,topology,#,93

Before Change


            new AlarmThresholdingBolt(config.database, config.kafkaProducerConfig),
            config.thresholdingBoltThreads)
        .fieldsGrouping("aggregation-bolt", new Fields(MetricAggregationBolt.FIELDS[0]))
        .fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,
            new Fields(EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS[1]))
        .setNumTasks(config.thresholdingBoltTasks);

    return builder.createTopology();
  }

After Change


            new AlarmThresholdingBolt(config.database, config.kafkaProducerConfig),
            config.thresholdingBoltThreads)
        .fieldsGrouping("aggregation-bolt", new Fields(MetricAggregationBolt.FIELDS[0]))
        .fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,
            new Fields(EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS[1]))
        .allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID)
        .setNumTasks(config.thresholdingBoltTasks);

    return builder.createTopology();
  }